Skip to content

[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097

Closed
Yicong-Huang wants to merge 16 commits intoapache:masterfrom
Yicong-Huang:SPARK-56253
Closed

[SPARK-56253][PYTHON][CONNECT] Make spark.read.json accept DataFrame input#55097
Yicong-Huang wants to merge 16 commits intoapache:masterfrom
Yicong-Huang:SPARK-56253

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

@Yicong-Huang Yicong-Huang commented Mar 30, 2026

What changes were proposed in this pull request?

Allow spark.read.json() to accept a DataFrame as input, in addition to file paths and RDDs. The first column of the input DataFrame must be of StringType; additional columns are ignored.

Why are the changes needed?

Parsing in-memory JSON text into a structured DataFrame currently requires sc.parallelize(), which is unavailable on Spark Connect. Accepting a DataFrame as input provides a Connect-compatible alternative. This is the inverse of DataFrame.toJSON().

Part of SPARK-55227.

Does this PR introduce any user-facing change?

Yes. spark.read.json() now accepts a DataFrame as input. The first column must be StringType; additional columns are ignored.

How was this patch tested?

New tests in test_datasources.py (classic) and test_connect_readwriter.py (Connect).

Was this patch authored or co-authored using generative AI tooling?

No

Comment thread python/pyspark/sql/readwriter.py Outdated
Comment thread python/pyspark/sql/tests/test_datasources.py
Comment thread python/pyspark/sql/connect/readwriter.py
Comment thread python/pyspark/sql/readwriter.py Outdated
Comment thread python/pyspark/sql/readwriter.py
Comment thread sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala Outdated
def ds: Dataset[String] = {
val input = transformRelation(rel.getInput)
val inputSchema = Dataset.ofRows(session, input).schema
require(inputSchema.fields.length == 1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should throw InvalidInputErrors as others

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't want to validate on the python side, all errors will be thrown on the scala side. so in classic we will have UNSUPPORTED_DESERIALIZER errors. for parity tests, it might be better to keep connect error also as UNSUPPORTED_DESERIALIZER, instead of InvalidInputErrors?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used InvalidInputErrors in the end


def test_json_with_dataframe_input_non_string_column(self):
int_df = self.spark.createDataFrame([(1,), (2,)], schema="value INT")
with self.assertRaises(Exception):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using assertRaisesRegex(Exception, "exactly one column|StringType") to at least verify the error message content

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added more regex to make the check tighter

Comment on lines +1764 to +1773
val input = transformRelation(rel.getInput)
val inputSchema = Dataset.ofRows(session, input).schema
if (inputSchema.fields.length != 1) {
throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
}
Dataset(session, input)(Encoders.STRING)
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the checks here because otherwise INT can be implicitly cast to STRING.

reader
}
def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING)
def ds: Dataset[String] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to avoid creating the dataset twice. Analysis can be somewhat expensive. Do this instead:

val input = transformRelation(rel.getInput)
val df = Dataset.ofRows(session, input)
val inputSchema = df.schema
if (inputSchema.fields.length != 1) {
  throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
  throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
}
df.as(Encoders.STRING)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

def ds: Dataset[String] = {
val input = transformRelation(rel.getInput)
val inputSchema = Dataset.ofRows(session, input).schema
if (inputSchema.fields.length != 1) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit on the fence about this one. It is fine to have multiple columns, as long as the first one is a string.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, just for clarification, what would be the behavior of multiple columns? should we just take the first column and ignore the rest?

throw InvalidInputErrors.parseInputNotSingleColumn(inputSchema.fields.length)
}
if (inputSchema.fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw InvalidInputErrors.parseInputNotStringType(inputSchema.fields.head.dataType)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a behavior change.

Comment thread sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala Outdated
if (fields.head.dataType != org.apache.spark.sql.types.StringType) {
throw QueryCompilationErrors.parseInputNotStringTypeError(fields.head.dataType)
}
df.select(df.columns.head).as(Encoders.STRING)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really have to add a projection here. df.as(Encoders.STRING) should work as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried removing the projection, but df.as(Encoders.STRING) on a multi-column DataFrame throws UNSUPPORTED_DESERIALIZER.FIELD_NUMBER_MISMATCH because the STRING encoder expects exactly one column. So the projection is needed to support multi-column DataFrames (using the first column). I'll keep it as-is.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the projection is needed to support multi-column DataFrames (using the first column)

I think it should fail in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @hvanhovell wants to support multiple column case, #55097 (comment) but I am a bit not sure about how we should support multi column input.
Currently it silently drops columns after the first one when receiving more than one columns. I could also change it to raise an exception. Or, do we want to somehow join the remaining columns back after we parse the json from the first column?

@hvanhovell @zhengruifeng what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally feel it should just fail, but if we want to support multiple columns by accept the first column, I think we need to document such behavior.
also cc @cloud-fan and @HyukjinKwon WDYT?

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silently dropping things is an anti pattern, let's fail explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let me change it to fail the case. Then we will not be able to support multi column.

Copy link
Copy Markdown
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - one small nit.

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

)
result = self.spark.read.json(multi_df)
expected = [Row(name="Alice"), Row(name="Bob")]
self.assertEqual(sorted(result.collect(), key=lambda r: r.name), expected)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case it should fail?

HyukjinKwon pushed a commit that referenced this pull request Apr 12, 2026
…me input in spark.read.json

### What changes were proposed in this pull request?

Follow-up to #55097. Reject multi-column DataFrame input in `spark.read.json()` explicitly instead of silently using the first column and dropping the rest.

Also renames error conditions and methods from `PARSE_INPUT_*` to `DATAFRAME_INPUT_*` since these are query compilation errors, not parse errors.

### Why are the changes needed?

Per review feedback on #55097 from cloud-fan and zhengruifeng: silently dropping columns is an anti-pattern. Multi-column DataFrame input should fail explicitly.

### Does this PR introduce _any_ user-facing change?

Yes. `spark.read.json(df)` now raises `DATAFRAME_INPUT_NOT_SINGLE_COLUMN` when the input DataFrame has more than one column (previously it silently used only the first column). Zero-column input now also raises `DATAFRAME_INPUT_NOT_SINGLE_COLUMN` instead of `PARSE_INPUT_NOT_STRING_TYPE`.

### How was this patch tested?

Updated existing tests in `test_datasources.py` (classic) and `test_connect_readwriter.py` (Connect) to verify that multi-column and zero-column input raises the expected error.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #55301 from Yicong-Huang/SPARK-56253-reject-multicol.

Authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Apr 13, 2026
…nput

### What changes were proposed in this pull request?

This PR adds support for passing a `DataFrame` containing CSV strings directly to `spark.read.csv()`, following the same pattern established by #55097 (SPARK-56253) for `spark.read.json()`.

### Why are the changes needed?

Adding DataFrame support to `csv()` makes the API consistent with `json()` and enables Connect-compatible CSV parsing without `sc.parallelize()`.

### Does this PR introduce _any_ user-facing change?

Yes. `spark.read.csv()` now accepts a `DataFrame` with a single string column as input, in addition to the existing `str`, `list`, and `RDD` inputs.

```python
csv_df = spark.createDataFrame([("Alice,25",), ("Bob,30",)], schema="value STRING")
spark.read.csv(csv_df, schema="name STRING, age INT").show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice| 25|
# |  Bob| 30|
# +-----+---+
```

### How was this patch tested?

Added 10 new test cases.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #55274 from Yicong-Huang/SPARK-56255.

Authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants